/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager.security;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;

/**
 * The NM maintains only two master-keys. The current key that RM knows and the
 * key from the previous rolling-interval.
 * 
 */
public class NMContainerTokenSecretManager extends
    BaseContainerTokenSecretManager {

  private static final Log LOG = LogFactory
      .getLog(NMContainerTokenSecretManager.class);
  
  private MasterKeyData previousMasterKey;
  
  private final Map<ApplicationId, ConcurrentMap<ContainerId, MasterKeyData>> oldMasterKeys;
  
  public NMContainerTokenSecretManager(Configuration conf) {
    super(conf);
    this.oldMasterKeys =
        new HashMap<ApplicationId, ConcurrentMap<ContainerId, MasterKeyData>>();
  }

  /**
   * Used by NodeManagers to create a token-secret-manager with the key obtained
   * from the RM. This can happen during registration or when the RM rolls the
   * master-key and signals the NM.
   * 
   * @param masterKeyRecord
   */
  @Private
  public synchronized void setMasterKey(MasterKey masterKeyRecord) {
    LOG.info("Rolling master-key for container-tokens, got key with id "
        + masterKeyRecord.getKeyId());
    if (super.currentMasterKey == null) {
      super.currentMasterKey = new MasterKeyData(masterKeyRecord);
    } else {
      if (super.currentMasterKey.getMasterKey().getKeyId() != masterKeyRecord
          .getKeyId()) {
        // Update keys only if the key has changed.
        this.previousMasterKey = super.currentMasterKey;
        super.currentMasterKey = new MasterKeyData(masterKeyRecord);
      }
    }
  }

  /**
   * Override of this is to validate ContainerTokens generated by using
   * different {@link MasterKey}s.
   */
  @Override
  public synchronized byte[] retrievePassword(
      ContainerTokenIdentifier identifier) throws SecretManager.InvalidToken {
    int keyId = identifier.getMasterKeyId();
    ContainerId containerId = identifier.getContainerID();
    ApplicationId appId =
        containerId.getApplicationAttemptId().getApplicationId();

    MasterKeyData masterKeyToUse = null;
    if (this.previousMasterKey != null
        && keyId == this.previousMasterKey.getMasterKey().getKeyId()) {
      // A container-launch has come in with a token generated off the last
      // master-key
      masterKeyToUse = this.previousMasterKey;
    } else if (keyId == super.currentMasterKey.getMasterKey().getKeyId()) {
      // A container-launch has come in with a token generated off the current
      // master-key
      masterKeyToUse = super.currentMasterKey;
    } else if (this.oldMasterKeys.containsKey(appId)
        && this.oldMasterKeys.get(appId).containsKey(containerId)) {
      // This means on the following happened:
      // (1) a stopContainer() or a getStatus() happened for a container with
      // token generated off a master-key that is neither current nor the
      // previous one.
      // (2) a container-relaunch has come in with a token generated off a
      // master-key that is neither current nor the previous one.
      // This basically lets stop and getStatus() calls with old-tokens to pass
      // through without any issue, i.e. (1).
      // Start-calls for repetitive launches (2) also pass through RPC here, but
      // get thwarted at the app-layer as part of startContainer() call.
      masterKeyToUse = this.oldMasterKeys.get(appId).get(containerId);
    }

    if (masterKeyToUse != null) {
      return retrievePasswordInternal(identifier, masterKeyToUse);
    }

    // Invalid request. Like startContainer() with token generated off
    // old-master-keys.
    throw new SecretManager.InvalidToken("Given Container "
        + identifier.getContainerID().toString()
        + " seems to have an illegally generated token.");
  }

  /**
   * Container start has gone through. Store the corresponding keys so that
   * stopContainer() and getContainerStatus() can be authenticated long after
   * the container-start went through.
   */
  public synchronized void startContainerSuccessful(
      ContainerTokenIdentifier tokenId) {
    if (!UserGroupInformation.isSecurityEnabled()) {
      return;
    }

    int keyId = tokenId.getMasterKeyId();
    if (currentMasterKey.getMasterKey().getKeyId() == keyId) {
      addKeyForContainerId(tokenId.getContainerID(), currentMasterKey);
    } else if (previousMasterKey != null
        && previousMasterKey.getMasterKey().getKeyId() == keyId) {
      addKeyForContainerId(tokenId.getContainerID(), previousMasterKey);
    }
  }

  /**
   * Ensure the startContainer call is not using an older cached key. Will
   * return false once startContainerSuccessful is called. Does not check
   * the actual key being current since that is verified by the security layer
   * via retrievePassword.
   */
  public synchronized boolean isValidStartContainerRequest(
      ContainerTokenIdentifier tokenId) {
    ContainerId containerID = tokenId.getContainerID();
    ApplicationId applicationId =
        containerID.getApplicationAttemptId().getApplicationId();
    return !this.oldMasterKeys.containsKey(applicationId)
        || !this.oldMasterKeys.get(applicationId).containsKey(containerID);
  }

  private synchronized void addKeyForContainerId(ContainerId containerId,
      MasterKeyData masterKeyData) {
    if (containerId != null) {
      ApplicationId appId =
          containerId.getApplicationAttemptId().getApplicationId();
      if (!this.oldMasterKeys.containsKey(appId)) {
        this.oldMasterKeys.put(appId,
          new ConcurrentHashMap<ContainerId, MasterKeyData>());
      }
      ConcurrentMap<ContainerId, MasterKeyData> containerIdToKeysMapForThisApp =
          this.oldMasterKeys.get(appId);
      containerIdToKeysMapForThisApp.put(containerId, masterKeyData);
    } else {
      LOG.warn("Not adding key for null containerId");
    }
  }

  // Holding on to master-keys corresponding to containers until the app is
  // finished due to the multiple ways a container can finish. Avoid
  // stopContainer calls seeing unnecessary authorization exceptions.
  public synchronized void appFinished(ApplicationId appId) {
    this.oldMasterKeys.remove(appId);
  }
}